{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Ingestion Pipeline + Document Management\n",
    "\n",
    "Attaching a `docstore` to the ingestion pipeline will enable document management.\n",
    "\n",
    "Using the `document.doc_id` or `node.ref_doc_id` as a grounding point, the ingestion pipeline will actively look for duplicate documents.\n",
    "\n",
    "It works by\n",
    "- Storing a map of `doc_id` -> `document_hash`\n",
    "- If a duplicate `doc_id` is detected, and the hash has changed, the document will be re-processed\n",
    "- If the hash has not changed, the document will be skipped in the pipeline\n",
    "\n",
    "If we do not attach a vector store, we can only check for and remove duplicate inputs.\n",
    "\n",
    "If a vector store is attached, we can also handle upserts! We have [another guide](/examples/ingestion/redis_ingestion_pipeline.ipynb) for upserts and vector stores."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create Seed Data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Make some test data\n",
    "!mkdir -p data\n",
    "!echo \"This is a test file: one!\" > data/test1.txt\n",
    "!echo \"This is a test file: two!\" > data/test2.txt"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "/home/loganm/.cache/pypoetry/virtualenvs/llama-index-4a-wkI5X-py3.11/lib/python3.11/site-packages/deeplake/util/check_latest_version.py:32: UserWarning: A newer version of deeplake (3.8.9) is available. It's recommended that you update to the latest version using `pip install -U deeplake`.\n",
      "  warnings.warn(\n"
     ]
    }
   ],
   "source": [
    "from llama_index import SimpleDirectoryReader\n",
    "\n",
    "# load documents with deterministic IDs\n",
    "documents = SimpleDirectoryReader(\"./data\", filename_as_id=True).load_data()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create Pipeline with Document Store"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from llama_index.embeddings import HuggingFaceEmbedding\n",
    "from llama_index.ingestion import IngestionPipeline\n",
    "from llama_index.storage.docstore import (\n",
    "    SimpleDocumentStore,\n",
    "    RedisDocumentStore,\n",
    "    MongoDocumentStore,\n",
    ")\n",
    "from llama_index.text_splitter import SentenceSplitter\n",
    "\n",
    "\n",
    "pipeline = IngestionPipeline(\n",
    "    transformations=[\n",
    "        SentenceSplitter(),\n",
    "        HuggingFaceEmbedding(model_name=\"BAAI/bge-small-en-v1.5\"),\n",
    "    ],\n",
    "    docstore=SimpleDocumentStore(),\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Docstore strategy set to upserts, but no vector store. Switching to duplicates_only strategy.\n"
     ]
    }
   ],
   "source": [
    "nodes = pipeline.run(documents=documents)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Ingested 2 Nodes\n"
     ]
    }
   ],
   "source": [
    "print(f\"Ingested {len(nodes)} Nodes\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### [Optional] Save/Load Pipeline\n",
    "\n",
    "Saving the pipeline will save both the internal cache and docstore.\n",
    "\n",
    "**NOTE:** If you were using remote caches/docstores, this step is not needed"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "pipeline.persist(\"./pipeline_storage\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "pipeline = IngestionPipeline(\n",
    "    transformations=[\n",
    "        SentenceSplitter(),\n",
    "        HuggingFaceEmbedding(model_name=\"BAAI/bge-small-en-v1.5\"),\n",
    "    ]\n",
    ")\n",
    "\n",
    "# restore the pipeline\n",
    "pipeline.load(\"./pipeline_storage\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Test the Document Management\n",
    "\n",
    "Here, we can create a new document, as well as edit an existing document, to test the document management.\n",
    "\n",
    "Both the new document and edited document will be ingested, while the unchanged document will be skipped"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!echo \"This is a test file: three!\" > data/test3.txt\n",
    "!echo \"This is a NEW test file: one!\" > data/test1.txt"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "documents = SimpleDirectoryReader(\"./data\", filename_as_id=True).load_data()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Docstore strategy set to upserts, but no vector store. Switching to duplicates_only strategy.\n"
     ]
    }
   ],
   "source": [
    "nodes = pipeline.run(documents=documents)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Ingested 2 Nodes\n"
     ]
    }
   ],
   "source": [
    "print(f\"Ingested {len(nodes)} Nodes\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Lets confirm which nodes were ingested:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Node: This is a NEW test file: one!\n",
      "Node: This is a test file: three!\n"
     ]
    }
   ],
   "source": [
    "for node in nodes:\n",
    "    print(f\"Node: {node.text}\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We can also verify the docstore has only three documents tracked"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "3\n"
     ]
    }
   ],
   "source": [
    "print(len(pipeline.docstore.docs))"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "llama-index-4a-wkI5X-py3.11",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
